23 multiprocessing多进程
Python有GIL(全局解释器锁),threading虽然能并发,但同一时刻只有一个线程在执行Python代码。要实现真正的多核并行,得用multiprocessing——它开多个进程,每个进程有独立的GIL,互不影响。
multiprocessing适合CPU密集型任务(大量计算、数据处理),threading适合I/O密集型任务(网络请求、文件读写)。
一、创建进程
1.1 基本用法
python
from multiprocessing import Process
import os
def worker(num):
print(f"进程 {os.getpid()}, 参数: {num}")
if __name__ == '__main__':
# 创建进程
p = Process(target=worker, args=(1,))
# 启动进程
p.start()
# 等待进程完成
p.join()
print(f"主进程 {os.getpid()} 继续")注意:在Windows上必须用if __name__ == '__main__':保护。
1.2 多个进程
python
from multiprocessing import Process
import os
def worker(num):
print(f"进程 {os.getpid()}, 参数: {num}")
import time
time.sleep(2)
print(f"进程 {num} 完成")
if __name__ == '__main__':
processes = []
for i in range(5):
p = Process(target=worker, args=(i,))
processes.append(p)
p.start()
for p in processes:
p.join()
print("所有进程完成")1.3 继承Process类
python
from multiprocessing import Process
import os
class MyProcess(Process):
def __init__(self, num):
super().__init__()
self.num = num
def run(self):
print(f"进程 {os.getpid()}, 参数: {self.num}")
if __name__ == '__main__':
p = MyProcess(1)
p.start()
p.join()二、进程池
2.1 Pool
进程池可以复用进程,避免频繁创建销毁。
python
from multiprocessing import Pool
def square(x):
return x * x
if __name__ == '__main__':
with Pool(4) as pool: # 4个工作进程
results = pool.map(square, [1, 2, 3, 4, 5])
print(results) # [1, 4, 9, 16, 25]2.2 apply和apply_async
python
from multiprocessing import Pool
import time
def task(x):
time.sleep(1)
return x * x
if __name__ == '__main__':
with Pool(4) as pool:
# apply:阻塞调用
result = pool.apply(task, (5,))
print(f"apply结果: {result}")
# apply_async:异步调用
async_result = pool.apply_async(task, (10,))
print(f"异步结果: {async_result.get()}") # get()获取结果
# map:并行映射
results = pool.map(task, [1, 2, 3, 4, 5])
print(f"map结果: {results}")
# map_async:异步并行映射
async_results = pool.map_async(task, [1, 2, 3])
print(f"异步map结果: {async_results.get()}")2.3 starmap
当函数有多个参数时使用。
python
from multiprocessing import Pool
def add(x, y):
return x + y
if __name__ == '__main__':
with Pool(4) as pool:
# starmap:解包参数
results = pool.starmap(add, [(1, 2), (3, 4), (5, 6)])
print(results) # [3, 7, 11]三、进程间通信
3.1 Queue:队列
python
from multiprocessing import Process, Queue
def producer(q):
for i in range(5):
q.put(i)
print(f"生产: {i}")
def consumer(q):
while True:
item = q.get()
if item is None: # 结束信号
break
print(f"消费: {item}")
if __name__ == '__main__':
q = Queue()
p1 = Process(target=producer, args=(q,))
p2 = Process(target=consumer, args=(q,))
p1.start()
p2.start()
p1.join()
q.put(None) # 发送结束信号
p2.join()3.2 Pipe:管道
两个进程之间的直接通信。
python
from multiprocessing import Process, Pipe
def sender(conn):
conn.send("你好")
conn.send("世界")
conn.close()
def receiver(conn):
while True:
try:
msg = conn.recv()
print(f"收到: {msg}")
except EOFError:
break
if __name__ == '__main__':
parent_conn, child_conn = Pipe()
p1 = Process(target=sender, args=(child_conn,))
p2 = Process(target=receiver, args=(parent_conn,))
p1.start()
p2.start()
p1.join()
p2.join()四、共享内存
4.1 Value
共享单个值。
python
from multiprocessing import Process, Value
def increment(counter):
for _ in range(100000):
counter.value += 1
if __name__ == '__main__':
counter = Value('i', 0) # 'i'表示整数
processes = [Process(target=increment, args=(counter,)) for _ in range(5)]
for p in processes:
p.start()
for p in processes:
p.join()
print(f"结果: {counter.value}") # 5000004.2 Array
共享数组。
python
from multiprocessing import Process, Array
def modify(arr, index, value):
arr[index] = value
if __name__ == '__main__':
arr = Array('i', [0, 0, 0, 0, 0]) # 'i'表示整数
processes = [Process(target=modify, args=(arr, i, i*10)) for i in range(5)]
for p in processes:
p.start()
for p in processes:
p.join()
print(list(arr)) # [0, 10, 20, 30, 40]4.3 Manager
更灵活的共享对象。
python
from multiprocessing import Process, Manager
def worker(shared_dict, shared_list, key, value):
shared_dict[key] = value
shared_list.append(value)
if __name__ == '__main__':
with Manager() as manager:
shared_dict = manager.dict()
shared_list = manager.list()
processes = [
Process(target=worker, args=(shared_dict, shared_list, f"key{i}", i))
for i in range(5)
]
for p in processes:
p.start()
for p in processes:
p.join()
print(dict(shared_dict))
print(list(shared_list))五、进程同步
5.1 Lock
python
from multiprocessing import Process, Lock, Value
def safe_increment(counter, lock):
for _ in range(100000):
with lock:
counter.value += 1
if __name__ == '__main__':
counter = Value('i', 0)
lock = Lock()
processes = [Process(target=safe_increment, args=(counter, lock)) for _ in range(5)]
for p in processes:
p.start()
for p in processes:
p.join()
print(f"结果: {counter.value}")5.2 Semaphore
python
from multiprocessing import Process, Semaphore
import time
def limited_task(sem, id):
with sem:
print(f"任务 {id} 开始")
time.sleep(2)
print(f"任务 {id} 完成")
if __name__ == '__main__':
sem = Semaphore(3) # 最多3个并发
processes = [Process(target=limited_task, args=(sem, i)) for i in range(10)]
for p in processes:
p.start()
for p in processes:
p.join()六、实战场景
6.1 并行计算
python
from multiprocessing import Pool
import time
def cpu_intensive(n):
"""CPU密集型任务"""
total = 0
for i in range(n):
total += i * i
return total
if __name__ == '__main__':
numbers = [10**7] * 8
# 串行
start = time.time()
results = [cpu_intensive(n) for n in numbers]
print(f"串行: {time.time() - start:.2f}秒")
# 并行
start = time.time()
with Pool() as pool:
results = pool.map(cpu_intensive, numbers)
print(f"并行: {time.time() - start:.2f}秒")6.2 批量文件处理
python
from multiprocessing import Pool
from pathlib import Path
def process_file(filepath):
"""处理单个文件"""
with open(filepath, 'r') as f:
content = f.read()
# 处理逻辑
return f"{filepath}: {len(content)} 字符"
if __name__ == '__main__':
files = list(Path("./data").glob("*.txt"))
with Pool(4) as pool:
results = pool.map(process_file, files)
for r in results:
print(r)七、常见问题
7.1 Windows上的保护
python
# Windows上必须用这个
if __name__ == '__main__':
p = Process(target=worker)
p.start()7.2 序列化问题
python
# 错误:lambda不能序列化
# pool.map(lambda x: x*2, [1,2,3]) # PicklingError
# 正确:用普通函数
def double(x):
return x * 2
pool.map(double, [1, 2, 3])八、与threading对比
| 特性 | threading | multiprocessing |
|---|---|---|
| 并发方式 | 线程 | 进程 |
| GIL限制 | 受限 | 不受限 |
| 适合任务 | I/O密集型 | CPU密集型 |
| 内存共享 | 共享内存 | 独立内存 |
| 创建开销 | 小 | 大 |
| 通信方式 | 共享变量 | Queue/Pipe/Manager |
九、总结
multiprocessing模块的核心:
| 组件 | 用途 |
|---|---|
Process | 创建进程 |
Pool | 进程池 |
Queue | 进程安全队列 |
Pipe | 两个进程间的管道 |
Value/Array | 共享内存 |
Manager | 共享对象管理器 |
Lock/Semaphore | 进程同步 |
使用场景:
- CPU密集型任务(数据处理、图像处理、科学计算)
- 需要利用多核CPU
- 批量任务并行处理
记住:CPU密集型用multiprocessing,I/O密集型用threading或asyncio。